Skip to content

Conversation

@jason810496
Copy link
Member

related: #49470

Why

As noted in #49470 (comment), PR #49470 unintentionally broke the display format of CloudWatchHandler.

What

This PR fixes the display issue for CloudWatchHandler and ensure it will display properly across different airflow-core versions.

@jason810496
Copy link
Member Author

jason810496 commented Aug 3, 2025

Just tested fixed CloudwatchTaskHandler with different airflow-core version setups and all of them display correctly.
I will keep on fixing the CI.

I setup the following airflow-core version matrix against fixed CloudwatchTaskHandler.

  • 3.1.0 (unreleased)
    • On main branch then run breeze start-airflow --python 3.10 --backend postgres --integration localstack --mount-sources providers-and-tests
  • 3.0.4 (unreleased)
    1. git checkout v3-0-test && rm dist/*
    2. breeze release-management prepare-airflow-distributions && breeze release-management prepare-task-sdk-distributions
    3. breeze start-airflow --python 3.10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version wheel
  • 3.0.3
    • On main branch then run breeze start-airflow --python 3.10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version 3.0.3

After fix, the log should be format correctly on frontend, screenshot as below

  • 3.1.0 (unreleased)
    • 3 1 0
  • 3.0.4 (unreleased)
    • 3 0 4
  • 3.0.3
    • 3 0 3

cc @ashb @eladkal

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still goes and loads everything into memory for Cloudwatch doesn't it? As we call io.read() which does [x for x in self.stream()] -- I wonder if the read code in file_task_handler should check for a io.stream function and use that in preference over io.read?

That might be a bit complex, but either way I think we need a bit more thought here to not undo all the changes you had for paging/oom of logs

@jason810496
Copy link
Member Author

jason810496 commented Aug 4, 2025

This still goes and loads everything into memory for Cloudwatch doesn't it?

Yes, I'm still WIP on local for setup stream-based read with backward compatibility.
Since #49470 was merged, so after 3.0.3 ( get_base_airflow_version_tuple() >= (3, 0, 3) ), the airflow-core get accept log stream.

As we call io.read() which does [x for x in self.stream()] -- I wonder if the read code in file_task_handler should check for a io.stream function and use that in preference over io.read?

I had considered compact mechanism in #49470 by peeking the type of the _read_remote_logs.

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/utils/log/file_task_handler.py#L611-L625

So we just need to make sure the provider with airflow-core after 3.0.3 version should return stream-based.
Somehow like:

# in `version_compat` module
# or maybe just use `get_base_airflow_version_tuple() >= (3, 0, 3)` directly in provider FileTaskHandler
STREAM_BASED_READ = get_base_airflow_version_tuple() >= (3, 0, 3)

# in provider FileTaskHandler
if STREAM_BASED_READ:
    return messages, log_streams # generator of StructuredLogMessage or str
else:
    return messages, log_list # list of str ( not memory efficient, but only after 3.0.3 airflow-core can support stream-based read )

@jason810496 jason810496 force-pushed the fix/logging/cloudwatch-handler-error branch from 7e5785d to de079f2 Compare August 10, 2025 14:25
@jason810496 jason810496 marked this pull request as ready for review August 10, 2025 16:09
@jason810496
Copy link
Member Author

Just tested the latest provider with the following version matrix and also check the memory usage.

  • Version matrix for Compatibility
    • 3.0.3
    • 3.0.4 -> start supporting stream-based read
    • main ( 5d302eb61f5878282c57105b12be0a8d1de936df version )
  • Memory usage comparison
    • 3.0.3
    • 3.0.4 -> should not have peek memory usage when reading task instance log

Compatibility

3.0.3

Setup:

breeze start-airflow --python 3.10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version 3.0.3

Result:
breeze start-airflow --python 3 10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version 3 0 3

3.0.4

Setup:

breeze start-airflow --python 3.10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version 3.0.4

Result:
breeze start-airflow --python 3 10 --backend postgres --integration localstack --mount-sources providers-and-tests --use-airflow-version 3 0 4

3.1.x ( main branch )

Setup:

  1. Switch to main
  2. Run rm -rf dist/*
  3. Run breeze release-management prepare-airflow-distributions
  4. Run breeze release-management prepare-task-sdk-distributions
  5. Switch back to branch of this PR
  6. Run breeze start-airflow --use-airflow-version wheel --mount-sources providers-and-tests --integration localstack

Result:
breeze start-airflow --use-airflow-version wheel --mount-sources providers-and-tests --integration localstack

Memory usage comparison

Setup:

  1. Run start-airflow and trigger DagRun, then terminate all processes instead of API Server
  2. Call get task instance log with json header
  3. Call get task instance log with nd-json header
3 0 3-memory

There is still memory peek for nd-json header.

3 0 4-memory

No memory peek for nd-json header.

@jason810496 jason810496 requested review from Lee-W and ashb August 10, 2025 16:24
# These types are similar, but have distinct names to make processing them less error prone
LogMessages: TypeAlias = list[str]
"""The legacy format of log messages before 3.0.2"""
"""The legacy format of log messages before 3.0.3"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was 3.0.4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the OOM log PR only got released in 3.0.4

class DateTimeEncoder(json.JSONEncoder):
"""Custom JSON encoder to handle datetime serialization."""

def default(self, obj):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def default(self, obj):
def default(self, obj: object) -> str:

def read(self, relative_path, ti: RuntimeTI) -> LegacyLogResponse:
messages, logs = self.stream(relative_path, ti)

return messages, [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth making it a function. This list comprehension is not that easy to comprehend

logs = [(self._event_to_str(event) for event in events)]
else:
logs = ["\n".join(self._event_to_str(event) for event in events)]
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whehter it's possible to list all possible exception instead of using Exception

logs: list[str] | list[Generator[str, None, None]]
try:
events = self.io.get_cloudwatch_logs(stream_name, task_instance)
if SUPPORT_STREAM_BASED_READ:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read on CloudwatchTaskHandler should only ever be called for old Airflow Core versions (Newer versions use CloudwatchRemoteIO directly instead -- do we need this here?

@jason810496 jason810496 marked this pull request as draft August 21, 2025 04:47
@jason810496 jason810496 force-pushed the fix/logging/cloudwatch-handler-error branch from de079f2 to 4832d37 Compare August 21, 2025 10:24
@eladkal eladkal added this to the Airflow 3.0.7 milestone Aug 22, 2025
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Aug 22, 2025
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes to this file backward compatible with Airflow 2.10? PRs that change both core and providers may hide compatibility issue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and I agree.
I will test the CloudWatchHandler with Airflow 2.10 as well later on.

@jason810496 jason810496 force-pushed the fix/logging/cloudwatch-handler-error branch from ffa3797 to 5a775df Compare September 6, 2025 17:21
@potiuk potiuk force-pushed the fix/logging/cloudwatch-handler-error branch from 30a2528 to 847a238 Compare October 25, 2025 20:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:logging area:providers provider:amazon AWS/Amazon - related issues type:bug-fix Changelog: Bug Fixes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants